package com.atguigu.flinksql.day13.sql;


import com.atguigu.flinksql.day13.udf.upFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test05_UDF {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2.使用FlinkSQL方式读取Kafka数据,同时提取处理时间
        tableEnv.executeSql("" +
                "CREATE TABLE t1_pt(  " +
                "    id string,  " +
                "    ts bigint,  " +
                "    name string, " +
                "    pt AS PROCTIME() " +
                ") WITH ( " +
                "  'connector' = 'kafka', " +
                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
                "  'properties.group.id' = 'test1', " +
                "  'scan.startup.mode' = 'group-offsets', " +
                "  'sink.partitioner' = 'fixed', " +
                "  'topic' = 'test_1109', " +
                "  'format' = 'json' " +
                ")");

        //注册UDF
        tableEnv.createTemporarySystemFunction("my_up", upFunction.class);

        //使用UDF
        tableEnv.sqlQuery("select id,my_up(name) from t1_pt")
                .execute()
                .print();

    }
}